package com.yuzhi.fine.thread;

import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue
 * @author tianshaojie
 * @date 2018/1/3
 */
public class BoundedPriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
    private static class PriorityQueue<E> {
        private final Comparator<? super E> comparator;
        private final E[] objects;

        private int head = 0;
        private int tail = 0;

        @SuppressWarnings("unchecked")
        public PriorityQueue(int capacity, Comparator<? super E> comparator) {
            this.objects = (E[])new Object[capacity];
            this.comparator = comparator;
        }

        public void add(E elem) {
            if (tail == objects.length) {
                // shift down |-----AAAAAAA|
                tail -= head;
                System.arraycopy(objects, head, objects, 0, tail);
                head = 0;
            }

            if (tail == head || comparator.compare(objects[tail - 1], elem) <= 0) {
                // Append
                objects[tail++] = elem;
            } else if (head > 0 && comparator.compare(objects[head], elem) > 0) {
                // Prepend
                objects[--head] = elem;
            } else {
                // Insert in the middle
                int index = upperBound(head, tail - 1, elem);
                System.arraycopy(objects, index, objects, index + 1, tail - index);
                objects[index] = elem;
                tail++;
            }
        }

        public E peek() {
            return (head != tail) ? objects[head] : null;
        }

        public E poll() {
            E elem = objects[head];
            objects[head] = null;
            head = (head + 1) % objects.length;
            if (head == 0) tail = 0;
            return elem;
        }

        public int size() {
            return tail - head;
        }

        public Comparator<? super E> comparator() {
            return this.comparator;
        }

        public boolean contains(Object o) {
            for (int i = head; i < tail; ++i) {
                if (objects[i] == o) {
                    return true;
                }
            }
            return false;
        }

        public int remainingCapacity() {
            return this.objects.length - (tail - head);
        }

        private int upperBound(int start, int end, E key) {
            while (start < end) {
                int mid = (start + end) >>> 1;
                E mitem = objects[mid];
                int cmp = comparator.compare(mitem, key);
                if (cmp > 0) {
                    end = mid;
                } else {
                    start = mid + 1;
                }
            }
            return start;
        }
    }


    // Lock used for all operations
    private final ReentrantLock lock = new ReentrantLock();

    // Condition for blocking when empty
    private final Condition notEmpty = lock.newCondition();

    // Wait queue for waiting puts
    private final Condition notFull = lock.newCondition();

    private final PriorityQueue<E> queue;

    /**
     * Creates a PriorityQueue with the specified capacity that orders its
     * elements according to the specified comparator.
     * @param capacity the capacity of this queue
     * @param comparator the comparator that will be used to order this priority queue
     */
    public BoundedPriorityBlockingQueue(int capacity,
                                        Comparator<? super E> comparator) {
        this.queue = new PriorityQueue<>(capacity, comparator);
    }

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();

        lock.lock();
        try {
            if (queue.remainingCapacity() > 0) {
                this.queue.add(e);
                notEmpty.signal();
                return true;
            }
        } finally {
            lock.unlock();
        }
        return false;
    }

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();

        lock.lock();
        try {
            while (queue.remainingCapacity() == 0) {
                notFull.await();
            }
            this.queue.add(e);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);

        lock.lockInterruptibly();
        try {
            while (queue.remainingCapacity() == 0) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            this.queue.add(e);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

    public E take() throws InterruptedException {
        E result = null;
        lock.lockInterruptibly();
        try {
            while (queue.size() == 0) {
                notEmpty.await();
            }
            result = queue.poll();
            notFull.signal();
        } finally {
            lock.unlock();
        }
        return result;
    }

    public E poll() {
        E result = null;
        lock.lock();
        try {
            if (queue.size() > 0) {
                result = queue.poll();
                notFull.signal();
            }
        } finally {
            lock.unlock();
        }
        return result;
    }

    public E poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        lock.lockInterruptibly();
        E result = null;
        try {
            while (queue.size() == 0 && nanos > 0) {
                nanos = notEmpty.awaitNanos(nanos);
            }
            if (queue.size() > 0) {
                result = queue.poll();
            }
            notFull.signal();
        } finally {
            lock.unlock();
        }
        return result;
    }

    public E peek() {
        lock.lock();
        try {
            return queue.peek();
        } finally {
            lock.unlock();
        }
    }

    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }

    public Iterator<E> iterator() {
        throw new UnsupportedOperationException();
    }

    public Comparator<? super E> comparator() {
        return queue.comparator();
    }

    public int remainingCapacity() {
        lock.lock();
        try {
            return queue.remainingCapacity();
        } finally {
            lock.unlock();
        }
    }

    public boolean remove(Object o) {
        throw new UnsupportedOperationException();
    }

    public boolean contains(Object o) {
        lock.lock();
        try {
            return queue.contains(o);
        } finally {
            lock.unlock();
        }
    }

    public int drainTo(Collection<? super E> c) {
        return drainTo(c, Integer.MAX_VALUE);
    }

    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        lock.lock();
        try {
            int n = Math.min(queue.size(), maxElements);
            for (int i = 0; i < n; ++i) {
                c.add(queue.poll());
            }
            return n;
        } finally {
            lock.unlock();
        }
    }
}